# [Flink]Flink-connector-http > 下面展示如何通过Flink去请求http接口或者将数据发送给http接口 ## Source 准备工作,需要在maven中引入依赖: ~~~xml org.apache.httpcomponents httpclient 4.5.10 ~~~ 一个HttpUtil,往上很多工具,用来实际发送http请求,详见附录部分 ### Get ~~~java public class HttpGetSource extends RichSourceFunction { private volatile boolean isRunning = true; private String url; private long requestInterval; private DeserializationSchema deserializer; // count out event private transient Counter counter; public HttpGetSource(String url, long requestInterval, DeserializationSchema deserializer) { this.url = url; this.requestInterval = requestInterval; this.deserializer = deserializer; } @Override public void open(Configuration parameters) throws Exception { counter = new SimpleCounter(); this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { try { // receive http message, csv format String message = HttpUtil.doGet(url); // deserializer csv message ctx.collect(deserializer.deserialize(message.getBytes()).toString()); this.counter.inc(); Thread.sleep(requestInterval); } catch (Exception e) { e.printStackTrace(); } } } @Override public void cancel() { isRunning = false; } } ~~~ ### Post ~~~java public class HttpPostSource extends RichSourceFunction { private volatile boolean isRunning = true; private String url; private long requestInterval; private DeserializationSchema deserializer; // count out event private transient Counter counter; private String body; public HttpPostSource(String url, long requestInterval, String body, DeserializationSchema deserializer) { this.url = url; this.requestInterval = requestInterval; this.deserializer = deserializer; this.body = body; } @Override public void open(Configuration parameters) throws Exception { counter = new SimpleCounter(); this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { try { // receive http message, csv format String message = HttpUtil.doPost(url, body, 1000); // deserializer csv message ctx.collect(deserializer.deserialize(message.getBytes()).toString()); this.counter.inc(); Thread.sleep(requestInterval); } catch (Exception e) { e.printStackTrace(); } } } @Override public void cancel() { isRunning = false; } } ~~~ ### 使用 - 使用Post ~~~java DataStreamSource streamSource = env.addSource(new HttpPostSource(URL, 1000, "", new SimpleStringSchema())); ~~~ - 使用Get ~~~java DataStreamSource streamSource = env.addSource(new HttpGetSource(URL, 1000, new SimpleStringSchema())); ~~~ ## Sink 同样可以将数据使用http发送出去,sink到其他端 这里使用别人已经写好的工具,需要在maven中引入,由于该依赖并不存在于中央仓库,开发者说明了使用方法,也就是去github上将仓库clone到本地,再使用`maven clean install`在本地编译并打入本地仓库之后,即可在项目pom中引入 ~~~xml net.galgus flink-connector-http 1.0-SNAPSHOT ~~~ 先定义配置条件 ~~~java //设置endpoint String endpoint = "http://localhost:8080/api/postdata/"; //设置header HashMap headerMap = new HashMap<>(); headerMap.put("Content-Type", "application/json"); HTTPConnectionConfig httpConnectionConfig = new HTTPConnectionConfig( endpoint, HTTPConnectionConfig.HTTPMethod.POST, headerMap, false ); ~~~ 在数据流中添加sink即可: ~~~java stream.addSink(new HTTPSink<>(httpConnectionConfig)) ~~~ ## 附录 - HttpUtil ~~~java public class HttpUtil { private static final Logger log = LoggerFactory.getLogger(HttpUtil.class); /** * 默认超时时间 */ private static final int DEFAULT_TIME_OUT = 3000; /** * get请求,超时时间默认 * @param api 请求URL * @return 响应JSON字符串 */ public static String doGet(String api) { return doGet(api, DEFAULT_TIME_OUT); } /** * get请求,超时时间传参 * @param api 请求URL * @param timeOut 请求超时时间(毫秒) * @return 响应JSON字符串 */ public static String doGet(String api, int timeOut) { HttpGet httpGet = new HttpGet(api); RequestConfig config = RequestConfig.custom() .setConnectTimeout(timeOut) .setConnectionRequestTimeout(timeOut) .build(); httpGet.setConfig(config); try (CloseableHttpClient client = HttpClients.createDefault(); CloseableHttpResponse response = client.execute(httpGet)) { return EntityUtils.toString(response.getEntity()); } catch (IOException e) { log.error("get " + api + " failed!", e); } return null; } /** * post请求,超时时间默认 * @param api 请求URL * @param body 请求体JSON字符串 * @return 响应JSON字符串 */ public static String doPost(String api, String body) { return doPost(api, body, DEFAULT_TIME_OUT); } /** * post请求,超时时间传参 * @param api 请求URL * @param body 请求体JSON字符串 * @param timeOut 请求超时时间(毫秒) * @return 响应JSON字符串 */ public static String doPost(String api, String body, int timeOut) { HttpPost httpPost = new HttpPost(api); StringEntity entity = new StringEntity(body, "utf-8"); entity.setContentType("application/json"); entity.setContentEncoding("utf-8"); httpPost.setEntity(entity); RequestConfig config = RequestConfig.custom() .setConnectTimeout(timeOut) .setConnectionRequestTimeout(timeOut) .build(); httpPost.setConfig(config); try (CloseableHttpClient client = HttpClients.createDefault(); CloseableHttpResponse response = client.execute(httpPost)) { return EntityUtils.toString(response.getEntity()); } catch (IOException e) { log.error("post " + api + " failed!", e); } return null; } } ~~~ - 完整测试pom文件 ~~~xml 4.0.0 me.roohom flink-http 1.0 8 8 1.12.2 2.1.0 org.apache.flink flink-clients_2.11 ${flink.version} provided org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} provided org.apache.flink flink-table-planner-blink_2.11 ${flink.version} provided org.apache.flink flink-streaming-java_2.11 ${flink.version} provided org.apache.flink flink-connector-kafka_2.11 ${flink.version} provided org.apache.flink flink-runtime-web_2.11 ${flink.version} compile org.projectlombok lombok 1.18.22 com.fasterxml.jackson.core jackson-databind 2.7.4 org.slf4j slf4j-simple 1.7.25 org.apache.httpcomponents httpclient 4.5.10 net.galgus flink-connector-http 1.0-SNAPSHOT ~~~